package com.bruce.tool.mq.rocket.core;

import com.bruce.tool.common.exception.BaseRuntimeException;
import com.bruce.tool.mq.rocket.config.RocketConfig;
import com.bruce.tool.mq.rocket.constant.RocketCode;
import com.bruce.tool.mq.rocket.constant.RocketPool;
import com.bruce.tool.mq.rocket.domain.RocketResponse;
import com.bruce.tool.mq.rocket.handler.RocketHandler;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * 功能 :
 *
 * @author : Bruce(刘正航) 10:02 2019-02-21
 */
@Component
public class RocketReceiver {

    @Autowired
    private RocketPool pool;

    public void origin(String code, RocketHandler handler){
        this.execute(code,handler,true);
    }

    public void execute(String code,RocketHandler handler){
        this.execute(code,handler,false);
    }

    private void execute(String code, RocketHandler handler, boolean isBase){
        DefaultMQPushConsumer consumer = pool.getReceivers().get(code);
        if( Objects.isNull(consumer) ){
            throw new BaseRuntimeException(RocketCode.MQERROR_RECEIVE_NULL.getCode(),RocketCode.MQERROR_RECEIVE_NULL.getMessage());
        }
        RocketConfig config = pool.getConfigs().get(code);
        if( Objects.isNull(config) ){
            throw new BaseRuntimeException(RocketCode.MQERROR_SENDER_NULL.getCode(),RocketCode.MQERROR_SENDER_NULL.getMessage());
        }

        consumer.unsubscribe(config.getTopic());
        consumer.shutdown();

        try {
            consumer.subscribe(config.getTopic(),config.getTags());

            if( isBase ){
                handler.handle(consumer);
                consumer.start();
                return;
            }

            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                ConsumeConcurrentlyStatus status = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                // 这里有点问题,如果一次收多个消息, 则有多个状态
                for (MessageExt message : list) {
                    RocketResponse response = RocketResponse.builder()
                            .messageId(message.getMsgId())
                            .topic(message.getTopic())
                            .body(message.getBody())
                            .timestamp(message.getBornTimestamp()).build();
                    status = handler.handle(response);
                }
                return status;
            });
            consumer.start();
        } catch (MQClientException e) {
            throw new BaseRuntimeException(RocketCode.MQERROR_RECEIVE_EXCEPTION.getCode(),RocketCode.MQERROR_RECEIVE_EXCEPTION.getMessage());
        }
    }
}
